8.05. Реактивная коммуникация
Реактивная коммуникация
Что такое реактивная коммуникация?
Реактивные взаимодействия фокусируются на обмене событиями в режиме реального времени. Системы реагируют на события по мере их возникновения, обеспечивая непрерывный поток данных.
Транспорт:
- WebSockets
- SSE
- Kafka Streams
- MQTT
WebSockets
WebSockets является самым распространённым представителем реактивной коммуникации, ведь устанавливается двусторонний канал связи между клиентом и сервером, который позволяет обмениваться данными в реальном времени. Это протокол для двусторонней связи между клиентом и сервером через единственный постоянный канал. После установления соединения данные передаются без дополнительных HTTP-заголовков.
WebSocket — протокол полнодуплексной связи поверх единого TCP-соединения, обеспечивающий низколатентный обмен данными между клиентом и сервером в режиме реального времени.
Архитектурные особенности
Двунаправленная связь
- После установления соединения данные передаются в обоих направлениях без необходимости повторного рукопожатия.
- Сервер может инициировать отправку сообщений клиенту без предварительного запроса (push-модель).
Единое соединение
- Сохраняется постоянное соединение на протяжении всего сеанса работы.
- Устраняется оверхед повторного установления соединения для каждого сообщения (в отличие от HTTP).
Низкая задержка
- Минимальные накладные расходы на передачу: заголовок фрейма составляет 2–14 байт.
- Отсутствие необходимости в заголовках HTTP для каждого сообщения.
Работа через прокси и брандмауэры
- Использует стандартные порты 80 (ws) и 443 (wss), совместим с существующей веб-инфраструктурой.
- Начинается с HTTP-рукопожатия, что позволяет проходить через большинство прокси.
Протокол и жизненный цикл соединения
1. Рукопожатие (Handshake)
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Sec-WebSocket-Protocol: chat, superchat
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat
- Клиент отправляет обычный HTTP-запрос с заголовками
Upgrade: websocket. - Сервер отвечает кодом 101 (Switching Protocols), подтверждая переход на протокол WebSocket.
- Заголовок
Sec-WebSocket-Keyиспользуется для предотвращения кэширования и проверки подлинности.
2. Передача данных
- Данные передаются в виде фреймов (frames) с минимальным заголовком.
- Каждый фрейм содержит:
- Флаги (FIN, RSV1-3, opcode)
- Длина полезной нагрузки (7, 7+16, или 7+64 бита)
- Маска (4 байта, обязательна для клиентских фреймов)
- Полезная нагрузка
3. Закрытие соединения
0x88 0x00 // FIN, opcode=Close, length=0
- Грациозное закрытие через контрольный фрейм CLOSE.
- Код состояния (1000 — нормальное закрытие, 1001 — уход, 4000+ — пользовательские коды).
Типы фреймов
| Opcode | Назначение | Описание |
|---|---|---|
| 0x0 | Continuation | Продолжение фрагментированного сообщения |
| 0x1 | Text | Текстовые данные в UTF-8 |
| 0x2 | Binary | Бинарные данные |
| 0x8 | Close | Запрос на закрытие соединения |
| 0x9 | Ping | Проверка активности соединения |
| 0xA | Pong | Ответ на Ping |
Механизмы надёжности
Контроль целостности
- Каждое сообщение может быть фрагментировано на несколько фреймов с восстановлением на стороне получателя.
- Проверка порядка фреймов и сборка полного сообщения.
Проверка активности (Heartbeat)
- Ping/Pong фреймы для обнаружения разрыва соединения.
- Типичный интервал: 30–60 секунд.
Переподключение
- Автоматическое восстановление соединения при обнаружении разрыва.
- Экспоненциальная задержка между попытками для предотвращения шторма запросов.
Аутентификация
- Токены передаются в начальном HTTP-рукопожатии (Authorization header).
- Поддержка сессий через cookies (если разрешено CORS).
Типичные сценарии применения
Чаты и мессенджеры
- Мгновенная доставка сообщений между пользователями.
- Индикация набора текста, статус онлайн/оффлайн.
Коллаборативные приложения
- Совместное редактирование документов в реальном времени.
- Синхронизация состояния между несколькими клиентами.
Финансовые приложения
- Потоковые котировки на бирже.
- Уведомления о транзакциях.
Игры
- Синхронизация игрового состояния между игроками.
- Мультиплеерные взаимодействия с минимальной задержкой.
Мониторинг и телеметрия
- Поток метрик серверов и приложений.
- Обновление дашбордов в реальном времени.
Уведомления
- Push-уведомления в веб-приложениях.
- Событийные оповещения без опроса.
JavaScript (браузерный клиент)
// Создание соединения
const ws = new WebSocket('wss://example.com/chat');
// Обработчики событий
ws.addEventListener('open', (event) => {
console.log('Соединение установлено');
// Отправка сообщения после установки соединения
ws.send(JSON.stringify({
type: 'join',
room: 'general',
username: 'Тимур'
}));
});
ws.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
console.log('Получено сообщение:', data);
switch (data.type) {
case 'message':
displayMessage(data.username, data.text);
break;
case 'user_joined':
updateUserList(data.users);
break;
}
});
ws.addEventListener('error', (event) => {
console.error('Ошибка соединения:', event);
});
ws.addEventListener('close', (event) => {
console.log('Соединение закрыто:', event.code, event.reason);
// Автоматическое переподключение
setTimeout(() => reconnect(), 3000);
});
// Отправка сообщения
function sendMessage(text) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'message',
text: text,
timestamp: Date.now()
}));
}
}
// Проверка активности
let pingInterval = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ping' }));
}
}, 30000);
ws.addEventListener('close', () => {
clearInterval(pingInterval);
});
Node.js (сервер на ws)
const WebSocket = require('ws');
const http = require('http');
// HTTP сервер для обслуживания статики
const server = http.createServer((req, res) => {
res.writeHead(200, { 'Content-Type': 'text/html' });
res.end('<h1>WebSocket Chat</h1>');
});
// WebSocket сервер
const wss = new WebSocket.Server({ server });
// Хранилище подключений
const clients = new Map();
const rooms = new Map();
wss.on('connection', (ws, req) => {
const clientId = generateId();
clients.set(clientId, ws);
console.log(`Новое соединение: ${clientId}`);
// Обработка сообщений
ws.on('message', (data) => {
try {
const message = JSON.parse(data);
handleClientMessage(clientId, message);
} catch (error) {
console.error('Ошибка обработки сообщения:', error);
}
});
// Обработка закрытия соединения
ws.on('close', () => {
console.log(`Соединение закрыто: ${clientId}`);
handleClientDisconnect(clientId);
clients.delete(clientId);
});
// Обработка ошибок
ws.on('error', (error) => {
console.error(`Ошибка соединения ${clientId}:`, error);
clients.delete(clientId);
});
// Отправка приветственного сообщения
ws.send(JSON.stringify({
type: 'welcome',
clientId: clientId,
timestamp: Date.now()
}));
});
function handleClientMessage(clientId, message) {
const ws = clients.get(clientId);
if (!ws || ws.readyState !== WebSocket.OPEN) return;
switch (message.type) {
case 'join':
joinRoom(clientId, message.room, message.username);
break;
case 'message':
broadcastToRoom(message.room, {
type: 'message',
username: message.username,
text: message.text,
timestamp: Date.now()
}, clientId);
break;
case 'ping':
ws.send(JSON.stringify({ type: 'pong' }));
break;
default:
console.warn(`Неизвестный тип сообщения: ${message.type}`);
}
}
function joinRoom(clientId, roomName, username) {
if (!rooms.has(roomName)) {
rooms.set(roomName, new Set());
}
const room = rooms.get(roomName);
room.add(clientId);
// Уведомить других участников комнаты
broadcastToRoom(roomName, {
type: 'user_joined',
username: username,
clientId: clientId,
users: Array.from(room).map(id => ({
id: id,
username: getUsername(id)
}))
});
// Подтвердить присоединение
const ws = clients.get(clientId);
if (ws) {
ws.send(JSON.stringify({
type: 'joined',
room: roomName,
users: Array.from(room)
}));
}
}
function broadcastToRoom(roomName, message, excludeClientId = null) {
const room = rooms.get(roomName);
if (!room) return;
const data = JSON.stringify(message);
room.forEach(clientId => {
if (clientId === excludeClientId) return;
const ws = clients.get(clientId);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(data);
} else {
// Очистить мёртвые соединения
room.delete(clientId);
}
});
// Удалить пустую комнату
if (room.size === 0) {
rooms.delete(roomName);
}
}
function handleClientDisconnect(clientId) {
// Удалить клиента из всех комнат
rooms.forEach((room, roomName) => {
if (room.has(clientId)) {
room.delete(clientId);
broadcastToRoom(roomName, {
type: 'user_left',
clientId: clientId
});
if (room.size === 0) {
rooms.delete(roomName);
}
}
});
}
function generateId() {
return Math.random().toString(36).substr(2, 9);
}
// Запуск сервера
server.listen(8080, () => {
console.log('Сервер запущен на порту 8080');
});
C# (ASP.NET Core)
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
var app = builder.Build();
// Middleware для обработки WebSocket
app.UseWebSockets(new WebSocketOptions
{
KeepAliveInterval = TimeSpan.FromSeconds(30),
ReceiveBufferSize = 4 * 1024
});
app.Map("/ws", async context =>
{
if (!context.WebSockets.IsWebSocketRequest)
{
context.Response.StatusCode = StatusCodes.Status400BadRequest;
return;
}
var webSocket = await context.WebSockets.AcceptWebSocketAsync();
await HandleWebSocketConnection(webSocket);
});
app.Run();
async Task HandleWebSocketConnection(WebSocket webSocket)
{
var clientId = Guid.NewGuid().ToString();
var buffer = new byte[4096];
Console.WriteLine($"Новое соединение: {clientId}");
try
{
while (webSocket.State == WebSocketState.Open)
{
var result = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None);
if (result.MessageType == WebSocketMessageType.Close)
{
await webSocket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
"Закрыто клиентом",
CancellationToken.None);
break;
}
// Обработка текстового сообщения
if (result.MessageType == WebSocketMessageType.Text)
{
var message = Encoding.UTF8.GetString(buffer, 0, result.Count);
await ProcessMessage(webSocket, clientId, message);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Ошибка соединения {clientId}: {ex.Message}");
}
finally
{
Console.WriteLine($"Соединение закрыто: {clientId}");
}
}
async Task ProcessMessage(WebSocket socket, string clientId, string message)
{
try
{
using var doc = JsonDocument.Parse(message);
var root = doc.RootElement;
if (!root.TryGetProperty("type", out var typeElement))
return;
var type = typeElement.GetString();
switch (type)
{
case "ping":
await SendMessage(socket, new
{
type = "pong",
timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
});
break;
case "message":
if (root.TryGetProperty("text", out var textElement))
{
var response = new
{
type = "message",
clientId = clientId,
text = textElement.GetString(),
timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
await SendMessage(socket, response);
}
break;
default:
await SendMessage(socket, new
{
type = "error",
message = $"Неизвестный тип: {type}"
});
break;
}
}
catch (JsonException ex)
{
await SendMessage(socket, new
{
type = "error",
message = $"Ошибка парсинга JSON: {ex.Message}"
});
}
}
async Task SendMessage(WebSocket socket, object message)
{
if (socket.State != WebSocketState.Open)
return;
var json = JsonSerializer.Serialize(message);
var bytes = Encoding.UTF8.GetBytes(json);
await socket.SendAsync(
new ArraySegment<byte>(bytes),
WebSocketMessageType.Text,
true,
CancellationToken.None);
}
Python (FastAPI + WebSockets)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import Dict, Set
import json
import asyncio
app = FastAPI()
# Хранилище подключений
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.rooms: Dict[str, Set[str]] = {}
async def connect(self, websocket: WebSocket, client_id: str):
await websocket.accept()
self.active_connections[client_id] = websocket
print(f"Новое соединение: {client_id}")
def disconnect(self, client_id: str):
if client_id in self.active_connections:
del self.active_connections[client_id]
print(f"Соединение закрыто: {client_id}")
# Удалить из всех комнат
for room in self.rooms.values():
room.discard(client_id)
async def send_personal_message(self, message: dict, client_id: str):
if client_id in self.active_connections:
websocket = self.active_connections[client_id]
await websocket.send_text(json.dumps(message))
async def broadcast(self, message: dict, room: str = None):
if room and room in self.rooms:
recipients = self.rooms[room]
else:
recipients = self.active_connections.keys()
data = json.dumps(message)
disconnected = []
for client_id in recipients:
if client_id in self.active_connections:
websocket = self.active_connections[client_id]
try:
await websocket.send_text(data)
except Exception:
disconnected.append(client_id)
# Очистить мёртвые соединения
for client_id in disconnected:
self.disconnect(client_id)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket, client_id)
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
msg_type = message.get("type")
if msg_type == "ping":
await manager.send_personal_message({
"type": "pong",
"timestamp": asyncio.get_event_loop().time()
}, client_id)
elif msg_type == "join":
room = message.get("room")
if room:
if room not in manager.rooms:
manager.rooms[room] = set()
manager.rooms[room].add(client_id)
await manager.broadcast({
"type": "user_joined",
"client_id": client_id,
"room": room
}, room)
elif msg_type == "message":
room = message.get("room")
await manager.broadcast({
"type": "message",
"client_id": client_id,
"text": message.get("text"),
"timestamp": asyncio.get_event_loop().time()
}, room)
except WebSocketDisconnect:
manager.disconnect(client_id)
except Exception as e:
print(f"Ошибка: {e}")
manager.disconnect(client_id)
@app.get("/")
async def get():
return HTMLResponse("""
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<div id="messages"></div>
<input id="message" type="text" />
<button onclick="sendMessage()">Send</button>
<script>
const ws = new WebSocket("ws://localhost:8000/ws/test_client");
ws.onmessage = (event) => {
const messages = document.getElementById('messages');
const data = JSON.parse(event.data);
const message = document.createElement('div');
message.textContent = JSON.stringify(data);
messages.appendChild(message);
};
function sendMessage() {
const input = document.getElementById('message');
ws.send(JSON.stringify({
type: 'message',
text: input.value
}));
input.value = '';
}
</script>
</body>
</html>
""")
Java (Spring WebSocket)
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Configuration
public class WebSocketHandler extends TextWebSocketHandler {
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
String sessionId = session.getId();
sessions.put(sessionId, session);
System.out.println("Новое соединение: " + sessionId);
try {
session.sendMessage(new TextMessage(
"{\"type\":\"welcome\",\"sessionId\":\"" + sessionId + "\"}"
));
} catch (Exception e) {
System.err.println("Ошибка отправки приветствия: " + e.getMessage());
}
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
String sessionId = session.getId();
try {
String payload = message.getPayload();
System.out.println("Сообщение от " + sessionId + ": " + payload);
// Эхо-ответ
session.sendMessage(new TextMessage(
"{\"type\":\"echo\",\"original\":" + payload + "}"
));
} catch (Exception e) {
System.err.println("Ошибка обработки сообщения: " + e.getMessage());
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
String sessionId = session.getId();
sessions.remove(sessionId);
System.out.println("Соединение закрыто: " + sessionId + " (" + status + ")");
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
System.err.println("Ошибка транспорта: " + exception.getMessage());
}
// Метод для отправки сообщения конкретному клиенту
public void sendMessageTo(String sessionId, String message) {
WebSocketSession session = sessions.get(sessionId);
if (session != null && session.isOpen()) {
try {
session.sendMessage(new TextMessage(message));
} catch (Exception e) {
System.err.println("Ошибка отправки: " + e.getMessage());
}
}
}
// Метод для рассылки всем клиентам
public void broadcast(String message) {
TextMessage textMessage = new TextMessage(message);
sessions.forEach((sessionId, session) -> {
if (session.isOpen()) {
try {
session.sendMessage(textMessage);
} catch (Exception e) {
System.err.println("Ошибка рассылки: " + e.getMessage());
}
}
});
}
}
Конфигурация WebSocket в Spring:
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final WebSocketHandler webSocketHandler;
public WebSocketConfig(WebSocketHandler webSocketHandler) {
this.webSocketHandler = webSocketHandler;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketHandler, "/ws")
.setAllowedOrigins("*");
}
}
SSE
Server-Sent Events (SSE) предполагает, что сервер отправляет обновления клиенту через однонаправленный канал, это протокол для однонаправленной передачи данных от сервера к клиенту через HTTP. Сервер отправляет данные клиенту, но обратная связь невозможна. Если соединение разрывается, клиент автоматически переподключается. Пример - трансляция обновлений и уведомления в реальном времени.
Server-Sent Events — стандарт веб-API для односторонней потоковой передачи данных от сервера к клиенту через обычное HTTP-соединение. Клиент устанавливает соединение и получает события в режиме реального времени без необходимости повторных запросов.
Архитектурные особенности
Односторонняя связь
- Данные передаются только от сервера к клиенту.
- Клиент не может отправлять сообщения через установленное соединение (только через отдельные HTTP-запросы).
Постоянное соединение
- После установки соединение остаётся открытым до явного закрытия или таймаута.
- Сервер отправляет данные по мере их появления, без ожидания запроса от клиента.
Текстовый формат
- Передача данных осуществляется в виде текстовых событий в кодировке UTF-8.
- Поддержка автоматического парсинга браузером через
EventSourceAPI.
Автоматическое восстановление
- При разрыве соединения браузер автоматически пытается переподключиться.
- Интервал повторных попыток настраивается сервером через поле
retry.
Совместимость с инфраструктурой
- Работает поверх стандартного HTTP/HTTPS без необходимости специальных протоколов.
- Проходит через прокси, балансировщики и брандмауэры без дополнительной настройки.
Протокол и формат сообщений
HTTP-заголовки ответа
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Transfer-Encoding: chunked
Content-Type: text/event-stream— обязательный заголовок, идентифицирующий SSE-поток.Cache-Control: no-cache— предотвращает кэширование промежуточными прокси.Connection: keep-alive— поддерживает постоянное соединение.
Формат события
event: message
data: {"type":"notification","text":"Новое сообщение"}
id: 12345
retry: 30000
data: Простое сообщение без типа
: Это комментарий (игнорируется клиентом)
Структура поля события:
event— тип события (по умолчаниюmessage).data— полезная нагрузка (может занимать несколько строк).id— уникальный идентификатор события для восстановления после переподключения.retry— интервал переподключения в миллисекундах (по умолчанию 3000 мс).
Многострочные данные
data: Первая строка
data: Вторая строка
data: Третья строка
После парсинга объединяются в одну строку с символами новой строки.
Разделитель событий
- Пустая строка (
\n\n) завершает событие. - Сервер должен отправлять
\n\nпосле каждого события.
Жизненный цикл соединения
1. Установление соединения
const eventSource = new EventSource('https://example.com/events');
2. Получение событий
eventSource.onmessage = (event) => {
console.log('Данные:', event.data);
};
eventSource.addEventListener('notification', (event) => {
console.log('Тип события:', event.type);
console.log('Данные:', event.data);
console.log('ID события:', event.lastEventId);
});
3. Автоматическое переподключение
- При разрыве соединения браузер повторяет запрос с заголовком
Last-Event-ID. - Сервер может возобновить поток с последнего полученного события.
GET /events HTTP/1.1
Host: example.com
Last-Event-ID: 12345
4. Закрытие соединения
eventSource.close();
Механизмы надёжности
Идентификаторы событий
- Поле
idпозволяет клиенту отслеживать последнее полученное событие. - При переподключении браузер отправляет
Last-Event-IDв заголовке запроса. - Сервер может возобновить поток с указанного события, предотвращая потерю данных.
Настройка интервала повтора
retry: 10000
- Значение в миллисекундах.
- Применяется ко всем последующим переподключениям до следующего изменения.
Обработка ошибок
eventSource.onerror = (error) => {
console.error('Ошибка соединения:', error);
// Проверка состояния
if (eventSource.readyState === EventSource.CLOSED) {
console.log('Соединение закрыто');
} else if (eventSource.readyState === EventSource.CONNECTING) {
console.log('Попытка переподключения...');
}
};
Состояния соединения
EventSource.CONNECTING(0) — соединение отсутствует или устанавливается.EventSource.OPEN(1) — соединение открыто и готово к приёму событий.EventSource.CLOSED(2) — соединение закрыто.
JavaScript (клиент)
// Создание соединения
const eventSource = new EventSource('https://api.example.com/stream', {
withCredentials: true // Для передачи cookies
});
// Обработчик всех событий (тип по умолчанию 'message')
eventSource.onmessage = (event) => {
console.log('Получено сообщение:', event.data);
};
// Обработчик события с типом 'notification'
eventSource.addEventListener('notification', (event) => {
const data = JSON.parse(event.data);
console.log('Уведомление:', data);
showNotification(data.title, data.body);
});
// Обработчик события с типом 'progress'
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
updateProgressBar(data.percent);
});
// Обработчик ошибок
eventSource.onerror = (error) => {
console.error('Ошибка SSE:', error);
// Проверка состояния соединения
switch (eventSource.readyState) {
case EventSource.CONNECTING:
console.log('Переподключение...');
break;
case EventSource.CLOSED:
console.log('Соединение закрыто');
// Можно создать новое соединение
break;
}
};
// Получение последнего ID события
console.log('Последнее событие ID:', eventSource.lastEventId);
// Закрытие соединения
function closeConnection() {
eventSource.close();
console.log('Соединение закрыто вручную');
}
// Пример использования с обработкой различных типов событий
const handlers = {
'message': (data) => console.log('Сообщение:', data),
'update': (data) => updateUI(data),
'error': (data) => showError(data),
'complete': (data) => {
console.log('Завершено:', data);
eventSource.close();
}
};
Object.entries(handlers).forEach(([type, handler]) => {
eventSource.addEventListener(type, (event) => {
try {
const data = JSON.parse(event.data);
handler(data);
} catch (e) {
console.error('Ошибка парсинга JSON:', e);
}
});
});
C# (ASP.NET Core)
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
[ApiController]
[Route("api/[controller]")]
public class EventsController : ControllerBase
{
[HttpGet]
public async Task GetEvents()
{
// Установка заголовков SSE
Response.Headers.Append("Content-Type", "text/event-stream");
Response.Headers.Append("Cache-Control", "no-cache");
Response.Headers.Append("Connection", "keep-alive");
// Отправка начального события
await SendEventAsync("connected", new { message = "Подключено к потоку событий" });
// Генерация событий в реальном времени
var cancellationToken = HttpContext.RequestAborted;
try
{
int eventId = 0;
while (!cancellationToken.IsCancellationRequested)
{
// Отправка события каждые 2 секунды
await SendEventAsync("tick", new
{
id = Interlocked.Increment(ref eventId),
timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
message = $"Событие #{eventId}"
});
// Отправка события с указанием интервала повтора
if (eventId == 1)
{
await SendRetryAsync(5000); // 5 секунд
}
// Отправка комментария (игнорируется клиентом)
await Response.WriteAsync($": Серверное время {DateTime.Now}\n\n");
await Response.Body.FlushAsync(cancellationToken);
await Task.Delay(2000, cancellationToken);
}
}
catch (TaskCanceledException)
{
// Клиент закрыл соединение
Console.WriteLine("Клиент отключился");
}
catch (Exception ex)
{
Console.WriteLine($"Ошибка потока: {ex.Message}");
}
finally
{
// Отправка финального события
await SendEventAsync("disconnected", new { message = "Поток завершён" });
}
}
[HttpGet("notifications")]
public async Task GetNotifications()
{
Response.Headers.Append("Content-Type", "text/event-stream");
Response.Headers.Append("Cache-Control", "no-cache");
var cancellationToken = HttpContext.RequestAborted;
// Получение последнего ID события из заголовка
var lastEventId = Request.Headers["Last-Event-ID"].ToString();
int lastId = string.IsNullOrEmpty(lastEventId) ? 0 : int.Parse(lastEventId);
Console.WriteLine($"Восстановление с события #{lastId}");
try
{
// Симуляция получения уведомлений из базы данных или очереди
while (!cancellationToken.IsCancellationRequested)
{
var notifications = GetNewNotifications(lastId);
foreach (var notification in notifications)
{
await SendEventAsync("notification", new
{
id = notification.Id,
type = notification.Type,
title = notification.Title,
body = notification.Body,
timestamp = notification.Timestamp
}, notification.Id.ToString());
lastId = notification.Id;
}
await Response.Body.FlushAsync(cancellationToken);
await Task.Delay(1000, cancellationToken);
}
}
catch (TaskCanceledException)
{
Console.WriteLine("Клиент отключился от уведомлений");
}
}
/// <summary>
/// Отправка события в формате SSE
/// </summary>
private async Task SendEventAsync(string eventType, object data, string id = null)
{
var sb = new StringBuilder();
if (!string.IsNullOrEmpty(id))
{
sb.AppendLine($"id: {id}");
}
sb.AppendLine($"event: {eventType}");
sb.AppendLine($"data: {System.Text.Json.JsonSerializer.Serialize(data)}");
sb.AppendLine();
await Response.WriteAsync(sb.ToString());
}
/// <summary>
/// Установка интервала повторного подключения
/// </summary>
private async Task SendRetryAsync(int milliseconds)
{
await Response.WriteAsync($"retry: {milliseconds}\n\n");
}
/// <summary>
/// Симуляция получения новых уведомлений
/// </summary>
private List<Notification> GetNewNotifications(int afterId)
{
// В реальном приложении: запрос к базе данных или очереди сообщений
var notifications = new List<Notification>();
// Пример генерации
var random = new Random();
if (random.Next(10) > 7)
{
notifications.Add(new Notification
{
Id = afterId + 1,
Type = "info",
Title = "Новое уведомление",
Body = $"Случайное событие #{afterId + 1}",
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
});
}
return notifications;
}
private class Notification
{
public int Id { get; set; }
public string Type { get; set; }
public string Title { get; set; }
public string Body { get; set; }
public long Timestamp { get; set; }
}
}
Python (FastAPI)
from fastapi import FastAPI, Response, Request, HTTPException
from fastapi.responses import StreamingResponse
from typing import Dict, Any
import asyncio
import json
import time
import uvicorn
app = FastAPI()
def generate_sse_event(
event_type: str = "message",
data: Any = None,
event_id: str = None,
retry: int = None
) -> str:
"""Форматирование события в формате SSE"""
lines = []
if event_id is not None:
lines.append(f"id: {event_id}")
if event_type != "message":
lines.append(f"event: {event_type}")
if data is not None:
if isinstance(data, dict):
data_str = json.dumps(data)
else:
data_str = str(data)
# Многострочные данные
for line in data_str.split('\n'):
lines.append(f"data: {line}")
if retry is not None:
lines.append(f"retry: {retry}")
lines.append("") # Пустая строка завершает событие
lines.append("") # Двойной перевод строки
return "\n".join(lines)
async def event_stream(request: Request, start_id: int = 0):
"""Генератор событий для SSE"""
event_id = start_id
# Отправка начального события
yield generate_sse_event(
event_type="connected",
data={"message": "Подключено к потоку событий", "start_id": start_id},
event_id=str(event_id)
)
event_id += 1
# Установка интервала повтора
yield generate_sse_event(retry=5000)
try:
while True:
# Проверка отключения клиента
if await request.is_disconnected():
print("Клиент отключился")
break
# Генерация события
event_data = {
"id": event_id,
"timestamp": time.time(),
"message": f"Событие #{event_id}"
}
yield generate_sse_event(
event_type="tick",
data=event_data,
event_id=str(event_id)
)
event_id += 1
# Отправка комментария
yield f": Серверное время {time.strftime('%H:%M:%S')}\n\n"
await asyncio.sleep(2)
except asyncio.CancelledError:
print("Поток отменён")
raise
except Exception as e:
print(f"Ошибка в потоке: {e}")
yield generate_sse_event(
event_type="error",
data={"message": str(e)}
)
finally:
# Финальное событие
yield generate_sse_event(
event_type="disconnected",
data={"message": "Поток завершён"}
)
@app.get("/events")
async def sse_endpoint(request: Request):
"""Эндпоинт для SSE потока"""
# Получение последнего ID события
last_event_id = request.headers.get("Last-Event-ID")
start_id = int(last_event_id) if last_event_id else 0
print(f"Подключение клиента, восстановление с события #{start_id}")
return StreamingResponse(
event_stream(request, start_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Отключение буферизации nginx
}
)
# Симуляция уведомлений
notifications_store: Dict[int, Dict] = {}
notification_counter = 0
async def notification_stream(request: Request):
"""Поток уведомлений"""
global notification_counter
last_id = 0
# Получение последнего события
last_event_id = request.headers.get("Last-Event-ID")
if last_event_id:
last_id = int(last_event_id)
print(f"Восстановление уведомлений с #{last_id}")
try:
while True:
if await request.is_disconnected():
break
# Проверка новых уведомлений
new_notifications = [
(nid, data) for nid, data in notifications_store.items()
if nid > last_id
]
for nid, data in sorted(new_notifications):
yield generate_sse_event(
event_type="notification",
data=data,
event_id=str(nid)
)
last_id = nid
await asyncio.sleep(1)
except asyncio.CancelledError:
raise
@app.get("/notifications")
async def notifications_endpoint(request: Request):
"""Эндпоинт для потока уведомлений"""
return StreamingResponse(
notification_stream(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
@app.post("/notify")
async def create_notification(notification: Dict[str, Any]):
"""Создание нового уведомления"""
global notification_counter
notification_counter += 1
notification_data = {
"id": notification_counter,
"type": notification.get("type", "info"),
"title": notification.get("title", "Уведомление"),
"body": notification.get("body", ""),
"timestamp": time.time()
}
notifications_store[notification_counter] = notification_data
return {"status": "created", "id": notification_counter}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Java (Spring WebFlux)
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@RestController
public class SseController {
// Хранилище для клиентских потоков
private final Map<String, Sinks.Many<String>> clientSinks = new ConcurrentHashMap<>();
private final AtomicInteger clientIdGenerator = new AtomicInteger(0);
/**
* Простой поток событий с фиксированным интервалом
*/
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> getEvents() {
return Flux.interval(Duration.ofSeconds(2))
.map(sequence -> ServerSentEvent.<String>builder()
.id(String.valueOf(sequence))
.event("tick")
.data(String.format("{\"sequence\":%d,\"timestamp\":%d}",
sequence, System.currentTimeMillis()))
.build());
}
/**
* Поток с автоматическим переподключением
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> eventStream() {
return Flux.interval(Duration.ofSeconds(1))
.take(30) // Ограничение для демонстрации
.map(sequence -> {
if (sequence == 0) {
// Первое событие с указанием интервала повтора
return ServerSentEvent.<String>builder()
.event("connected")
.data("{\"status\":\"connected\"}")
.build();
} else if (sequence == 1) {
// Установка интервала повтора
return ServerSentEvent.<String>builder()
.comment("retry: 5000")
.build();
} else {
return ServerSentEvent.<String>builder()
.id(String.valueOf(sequence))
.event("message")
.data(String.format("{\"count\":%d,\"time\":%d}",
sequence, System.currentTimeMillis()))
.build();
}
});
}
/**
* Broadcast уведомлений для всех подключенных клиентов
*/
@GetMapping(value = "/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> notificationsStream() {
String clientId = "client-" + clientIdGenerator.incrementAndGet();
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE);
clientSinks.put(clientId, sink);
System.out.println("Новый клиент подключился: " + clientId);
// Отправка приветственного сообщения
sink.tryEmitNext("{\"type\":\"connected\",\"clientId\":\"" + clientId + "\"}");
return sink.asFlux()
.map(data -> ServerSentEvent.<String>builder()
.data(data)
.build())
.doFinally(signalType -> {
System.out.println("Клиент отключился: " + clientId);
clientSinks.remove(clientId);
});
}
/**
* Отправка уведомления всем клиентам
*/
public void broadcastNotification(String type, String title, String body) {
String message = String.format(
"{\"type\":\"%s\",\"title\":\"%s\",\"body\":\"%s\",\"timestamp\":%d}",
type, title, body, System.currentTimeMillis()
);
clientSinks.forEach((clientId, sink) -> {
sink.tryEmitNext(message);
});
}
/**
* Пример использования broadcast
*/
public void simulateNotifications() {
new Thread(() -> {
int count = 0;
while (true) {
try {
Thread.sleep(3000);
count++;
broadcastNotification(
"info",
"Уведомление #" + count,
"Тестовое уведомление в " + System.currentTimeMillis()
);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
}
Spring MVC альтернатива (с использованием SseEmitter):
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RestController
public class SseEmitterController {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final ExecutorService executor = Executors.newCachedThreadPool();
@GetMapping(value = "/sse", produces = "text/event-stream")
public SseEmitter handleSse() {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); // Без таймаута
String emitterId = "emitter-" + System.currentTimeMillis();
emitters.put(emitterId, emitter);
// Настройка обработчиков
emitter.onCompletion(() -> {
System.out.println("SSE завершён: " + emitterId);
emitters.remove(emitterId);
});
emitter.onTimeout(() -> {
System.out.println("SSE таймаут: " + emitterId);
emitters.remove(emitterId);
});
emitter.onError(error -> {
System.out.println("SSE ошибка: " + emitterId + " - " + error.getMessage());
emitters.remove(emitterId);
});
// Отправка начального сообщения
try {
emitter.send(SseEmitter.event()
.name("connected")
.data("{\"status\":\"connected\",\"id\":\"" + emitterId + "\"}"));
} catch (IOException e) {
e.printStackTrace();
}
// Запуск потока отправки событий
executor.submit(() -> {
int count = 0;
try {
while (true) {
Thread.sleep(2000);
if (!emitters.containsKey(emitterId)) {
break;
}
count++;
emitter.send(SseEmitter.event()
.id(String.valueOf(count))
.name("tick")
.data("{\"count\":" + count + ",\"time\":" + System.currentTimeMillis() + "}"));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (IOException e) {
System.out.println("Ошибка отправки: " + e.getMessage());
emitters.remove(emitterId);
}
});
return emitter;
}
/**
* Broadcast сообщения всем подключенным клиентам
*/
public void broadcast(String eventName, Object data) {
emitters.forEach((id, emitter) -> {
try {
emitter.send(SseEmitter.event()
.name(eventName)
.data(data));
} catch (IOException e) {
System.out.println("Ошибка broadcast для " + id + ": " + e.getMessage());
emitters.remove(id);
}
});
}
}
Node.js (Express)
const express = require('express');
const app = express();
/**
* Простой SSE поток
*/
app.get('/events', (req, res) => {
// Установка заголовков
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); // Отключение буферизации nginx
// Получение последнего события
const lastEventId = req.headers['last-event-id'];
let eventId = lastEventId ? parseInt(lastEventId) : 0;
console.log(`Клиент подключился, последнее событие: ${lastEventId || 'нет'}`);
// Отправка начального события
sendEvent(res, 'connected', { message: 'Подключено к потоку' }, ++eventId);
// Установка интервала повтора
res.write(`retry: 5000\n\n`);
// Генерация событий
const interval = setInterval(() => {
if (res.writableEnded) {
clearInterval(interval);
console.log('Клиент отключился');
return;
}
const data = {
id: ++eventId,
timestamp: Date.now(),
message: `Событие #${eventId}`
};
sendEvent(res, 'tick', data, eventId.toString());
// Отправка комментария
res.write(`: Серверное время ${new Date().toISOString()}\n\n`);
res.flush();
}, 2000);
// Обработка отключения
req.on('close', () => {
clearInterval(interval);
console.log('Соединение закрыто клиентом');
});
// Обработка ошибок
res.on('error', (err) => {
clearInterval(interval);
console.error('Ошибка потока:', err);
});
});
/**
* Поток уведомлений
*/
const notifications = [];
let notificationId = 0;
app.get('/notifications', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const lastEventId = req.headers['last-event-id'];
let lastId = lastEventId ? parseInt(lastEventId) : 0;
console.log(`Уведомления: восстановление с #${lastId}`);
// Отправка накопленных уведомлений
notifications
.filter(n => n.id > lastId)
.forEach(notification => {
sendEvent(res, 'notification', notification.data, notification.id.toString());
});
// Отслеживание новых уведомлений
const checkInterval = setInterval(() => {
if (res.writableEnded) {
clearInterval(checkInterval);
return;
}
const newNotifications = notifications.filter(n => n.id > lastId);
newNotifications.forEach(notification => {
sendEvent(res, 'notification', notification.data, notification.id.toString());
lastId = notification.id;
});
if (newNotifications.length > 0) {
res.flush();
}
}, 1000);
req.on('close', () => {
clearInterval(checkInterval);
});
});
/**
* Создание уведомления
*/
app.post('/notify', express.json(), (req, res) => {
notificationId++;
const notification = {
id: notificationId,
data: {
type: req.body.type || 'info',
title: req.body.title || 'Уведомление',
body: req.body.body || '',
timestamp: Date.now()
}
};
notifications.push(notification);
console.log(`Новое уведомление #${notificationId}:`, notification.data);
res.json({ status: 'created', id: notificationId });
});
/**
* Вспомогательная функция отправки события
*/
function sendEvent(res, eventType, data, id = null) {
if (id) {
res.write(`id: ${id}\n`);
}
if (eventType !== 'message') {
res.write(`event: ${eventType}\n`);
}
const dataStr = typeof data === 'string' ? data : JSON.stringify(data);
dataStr.split('\n').forEach(line => {
res.write(`data: ${line}\n`);
});
res.write('\n'); // Пустая строка завершает событие
}
// Запуск сервера
const PORT = 3000;
app.listen(PORT, () => {
console.log(`Сервер запущен на порту ${PORT}`);
});
Event Streaming
Основы Event Streaming
Event Streaming — это потоковые платформы, такие как Apache Kafka, позволяют системам подписываться на события и реагировать на них.
События сохраняются в упорядоченном, неизменяемом журнале и могут быть потреблены множеством независимых приложений в реальном времени или с отсрочкой.
Событие (Event)
- Атомарная запись о произошедшем факте в системе.
- Содержит: тип события, временну́ю метку, идентификаторы, полезную нагрузку.
- Неизменяемо после создания.
Топик (Topic)
- Логический канал для категоризации событий.
- Разделяет события по доменам или типам (например,
orders.created,users.registered). - Поддерживает публикацию от множества производителей и потребление множеством подписчиков.
Партиция (Partition)
- Горизонтальное разделение топика для масштабирования и параллелизма.
- События внутри партиции упорядочены.
- Ключ события определяет принадлежность к партиции (хеширование).
Смещение (Offset)
- Уникальный идентификатор события внутри партиции.
- Позволяет потребителю отслеживать позицию в потоке.
- Гарантирует доставку «хотя бы один раз» при сохранении смещения.
Группа потребителей (Consumer Group)
- Набор потребителей, совместно обрабатывающих топик.
- Каждая партиция назначается одному потребителю в группе.
- Обеспечивает масштабирование обработки и отказоустойчивость.
Retention Policy
- Политика хранения событий: по времени (например, 7 дней) или по объёму (например, 100 ГБ).
- Позволяет повторную обработку событий в течение заданного окна.
Потоковые операторы — это инструменты для обработки потоков данных в реальном времени. Они позволяют выполнять преобразования, фильтрацию и агрегацию данных.
Примеры потоковых операторов:
- Map — преобразует каждый элемент потока. Пример - увеличить все числа в потоке на 1:
stream.map(x => x + 1);
- Filter — отбирает только те элементы, кторые удовлетворяют условию. Пример - оставить только положительные числа:
stream.filter(x => x > 0);
- Reduce — агрегирует данные в один результат. Пример - подсчитать сумму всех чисел в потоке:
stream.reduce((acc, x) => acc + x, 0);
- FlatMap — преобразует каждый элемент в несколько элементов. Пример - разбить строку на слова:
stream.flatMap(sentence => sentence.split(' '));
Потоковые операторы в интеграции используются как раз в Apache Kafka Streams. На практике это обработка событий в реальном времени (например, подсчёт количества кликов пользователей).
Kafta и MQTT - не только асинхронные протоколы, но и реактивные.
Реактивные системы обеспечивают мгновенный обмен данными, и они подходят для работы с большим количеством событий и пользователей. Такое можно встретить в чатах, онлайн-играх, биржевых платформах, системах мониторинга.
Apache Kafka
Архитектура
- Распределённый журнал (distributed commit log) с репликацией.
- Кластер состоит из брокеров (brokers), каждый хранит подмножество партиций.
- ZooKeeper (в версиях до 2.8) или KRaft (Kafka Raft) для координации кластера.
Гарантии
- Упорядоченность в пределах партиции.
- Персистентность через запись на диск с сегментацией.
- Репликация с настраиваемым фактором (replication factor).
- Подтверждение записи:
acks=0(без подтверждения),acks=1(лидер),acks=all(все реплики).
Компоненты экосистемы
- Kafka Connect — коннекторы для интеграции с внешними системами (базы данных, хранилища).
- Kafka Streams — библиотека для потоковой обработки на стороне приложения.
- ksqlDB — SQL-подобный движок для потоковых запросов.
- Schema Registry — централизованное управление схемами сообщений (Avro, Protobuf).
Apache Pulsar
Архитектура
- Разделение вычислений и хранения: брокеры обрабатывают запросы, BookKeeper хранит данные.
- Многоуровневое хранение (tiered storage): горячие данные в BookKeeper, холодные в объектном хранилище (S3, GCS).
Отличия от Kafka
- Встроенная поддержка multi-tenancy с изоляцией на уровне тенантов и пространств имён.
- Гео-репликация на уровне кластера.
- Гибкая политика хранения с автоматическим перемещением данных.
NATS Streaming (STAN) / JetStream
Особенности
- Лёгкий протокол с минимальным оверхедом.
- JetStream (встроен в NATS 2.0+) добавляет персистентность и потоковую обработку.
- Поддержка «интересных» подписок (interest-based) и потоков (streams).
RabbitMQ Streams
Особенности
- Расширение RabbitMQ для потоковой обработки.
- Сохраняет совместимость с AMQP и добавляет семантику журналов.
- Поддержка offset tracking и повторного чтения.
Event Sourcing
Принцип
- Состояние агрегата определяется последовательностью событий, а не текущим снимком.
- События сохраняются в журнале как единственный источник истины.
Преимущества
- Полная история изменений для аудита и отладки.
- Возможность восстановления состояния на любой момент времени.
- Упрощение распределённых транзакций через компенсирующие события.
Реализация
// События
public record OrderCreated(Guid OrderId, string CustomerId, DateTime Timestamp);
public record OrderItemAdded(Guid OrderId, Product Product, int Quantity);
public record OrderCompleted(Guid OrderId, decimal TotalAmount);
// Агрегат
public class Order
{
private readonly List<object> _events = new();
public Guid Id { get; private set; }
public string CustomerId { get; private set; }
public List<OrderItem> Items { get; } = new();
public bool IsCompleted { get; private set; }
public void Create(string customerId)
{
var ev = new OrderCreated(Id, customerId, DateTime.UtcNow);
Apply(ev);
_events.Add(ev);
}
public void AddItem(Product product, int quantity)
{
var ev = new OrderItemAdded(Id, product, quantity);
Apply(ev);
_events.Add(ev);
}
public void Complete(decimal totalAmount)
{
var ev = new OrderCompleted(Id, totalAmount);
Apply(ev);
_events.Add(ev);
}
private void Apply(object ev)
{
switch (ev)
{
case OrderCreated created:
Id = created.OrderId;
CustomerId = created.CustomerId;
break;
case OrderItemAdded added:
Items.Add(new OrderItem(added.Product, added.Quantity));
break;
case OrderCompleted _:
IsCompleted = true;
break;
}
}
public IEnumerable<object> GetUncommittedEvents() => _events;
public void ClearEvents() => _events.Clear();
}
CQRS (Command Query Responsibility Segregation)
Принцип
- Разделение операций записи (команды) и чтения (запросы).
- Команды публикуют события в поток.
- Проекции (read models) обрабатывают события и обновляют материализованные представления.
Архитектура
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ Command │─────▶│ Event │─────▶│ Event │
│ Handler │ │ Bus/Stream │ │ Store │
└─────────────┘ └──────────────┘ └──────────────┘
│
▼
┌─────────────────────┐
│ Projection/ │
│ Read Model │
└─────────────────────┘
│
▼
┌─────────────────────┐
│ Query Handler │
└─────────────────────┘
Event-Driven Microservices
Принцип
- Микросервисы взаимодействуют через события, а не через синхронные вызовы.
- Каждый сервис имеет собственный контекст и базу данных.
- События служат для распространения изменений между границами контекстов.
Пример потока
1. Order Service: OrderCreated → Kafka (topic: orders)
2. Payment Service: потребляет OrderCreated → создаёт платеж → PaymentProcessed → Kafka (topic: payments)
3. Inventory Service: потребляет OrderCreated → резервирует товар → InventoryReserved → Kafka (topic: inventory)
4. Notification Service: потребляет все события → отправляет уведомления
Change Data Capture (CDC)
Принцип
- Перехват изменений в базе данных и публикация их как событий.
- Позволяет синхронизировать данные между системами без изменения прикладного кода.
Инструменты
- Debezium — CDC для реляционных баз (PostgreSQL, MySQL) и MongoDB.
- Maxwell — MySQL binlog reader.
- Kafka Connect с коннекторами JDBC, JMS.
C# (.NET) с Confluent Kafka
Производитель событий
using Confluent.Kafka;
using System.Text.Json;
public class OrderEventProducer
{
private readonly IProducer<string, string> _producer;
private readonly string _topic = "orders";
public OrderEventProducer(string bootstrapServers)
{
var config = new ProducerConfig
{
BootstrapServers = bootstrapServers,
Acks = Acks.All,
EnableIdempotence = true,
MaxInFlight = 5
};
_producer = new ProducerBuilder<string, string>(config).Build();
}
public async Task ProduceOrderCreatedAsync(Order order)
{
var eventData = new
{
type = "OrderCreated",
orderId = order.Id,
customerId = order.CustomerId,
items = order.Items,
timestamp = DateTimeOffset.UtcNow
};
var message = new Message<string, string>
{
Key = order.Id.ToString(),
Value = JsonSerializer.Serialize(eventData)
};
try
{
var result = await _producer.ProduceAsync(_topic, message);
Console.WriteLine($"Событие отправлено: {result.TopicPartitionOffset}");
}
catch (ProduceException<string, string> ex)
{
Console.WriteLine($"Ошибка отправки: {ex.Error.Reason}");
throw;
}
}
public void Dispose()
{
_producer?.Dispose();
}
}
Потребитель событий
using Confluent.Kafka;
public class OrderEventConsumer
{
private readonly IConsumer<string, string> _consumer;
private readonly string _topic = "orders";
private readonly CancellationTokenSource _cts = new();
public OrderEventConsumer(string bootstrapServers, string groupId)
{
var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = groupId,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
SessionTimeoutMs = 30000
};
_consumer = new ConsumerBuilder<string, string>(config)
.SetPartitionsAssignedHandler((c, partitions) =>
{
Console.WriteLine($"Назначены партиции: {string.Join(',', partitions)}");
})
.Build();
}
public async Task StartAsync()
{
_consumer.Subscribe(_topic);
try
{
while (!_cts.Token.IsCancellationRequested)
{
try
{
var result = _consumer.Consume(_cts.Token);
Console.WriteLine($"Получено событие: {result.Message.Key}");
ProcessEvent(result.Message.Value);
// Подтверждение обработки
_consumer.Commit(result);
}
catch (ConsumeException ex)
{
Console.WriteLine($"Ошибка потребления: {ex.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Потребление остановлено");
}
finally
{
_consumer.Close();
}
}
private void ProcessEvent(string eventData)
{
// Обработка события
Console.WriteLine($"Обработка: {eventData}");
}
public void Stop()
{
_cts.Cancel();
}
public void Dispose()
{
_consumer?.Dispose();
_cts?.Dispose();
}
}
Потоковая обработка с Kafka Streams
// Пример агрегации с использованием Kafka Streams API (Java-библиотека через IKVM или REST Proxy)
// Для .NET можно использовать Kafka REST Proxy или Confluent Cloud ksqlDB
public class OrderAnalytics
{
// Подсчёт заказов по клиентам
/*
CREATE TABLE customer_order_count AS
SELECT customer_id, COUNT(*) AS order_count
FROM orders
GROUP BY customer_id
EMIT CHANGES;
*/
// Средний чек по периодам
/*
CREATE TABLE avg_order_value AS
SELECT
WINDOWSTART() as period_start,
COUNT(*) as order_count,
AVG(total_amount) as avg_amount
FROM orders
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY WINDOWSTART()
EMIT CHANGES;
*/
}
Python с confluent-kafka
Производитель
from confluent_kafka import Producer
import json
import uuid
from datetime import datetime
class OrderProducer:
def __init__(self, bootstrap_servers):
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'acks': 'all',
'enable.idempotence': True
})
self.topic = 'orders'
def delivery_report(self, err, msg):
if err is not None:
print(f'Ошибка доставки: {err}')
else:
print(f'Сообщение доставлено: {msg.topic()} [{msg.partition()}]')
def produce_order_created(self, order_id, customer_id, items):
event = {
'type': 'OrderCreated',
'order_id': str(order_id),
'customer_id': customer_id,
'items': items,
'timestamp': datetime.utcnow().isoformat()
}
self.producer.produce(
topic=self.topic,
key=str(order_id),
value=json.dumps(event),
callback=self.delivery_report
)
# Ожидание доставки всех сообщений
self.producer.flush()
def close(self):
self.producer.flush()
# Использование
producer = OrderProducer('localhost:9092')
producer.produce_order_created(
uuid.uuid4(),
'customer_123',
[{'product_id': 'p1', 'quantity': 2, 'price': 100.0}]
)
producer.close()
Потребитель
from confluent_kafka import Consumer, KafkaException
import json
class OrderConsumer:
def __init__(self, bootstrap_servers, group_id):
self.consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'session.timeout.ms': 30000
})
self.topic = 'orders'
def start(self):
self.consumer.subscribe([self.topic])
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
try:
event = json.loads(msg.value().decode('utf-8'))
self.process_event(event)
# Подтверждение обработки
self.consumer.commit(asynchronous=False)
except json.JSONDecodeError as e:
print(f'Ошибка парсинга JSON: {e}')
except Exception as e:
print(f'Ошибка обработки события: {e}')
# Можно отправить в DLQ (dead letter queue)
except KeyboardInterrupt:
print('Потребление остановлено')
finally:
self.consumer.close()
def process_event(self, event):
event_type = event.get('type')
if event_type == 'OrderCreated':
print(f"Создан заказ: {event['order_id']}")
# Логика обработки...
elif event_type == 'OrderCompleted':
print(f"Завершён заказ: {event['order_id']}")
else:
print(f"Неизвестный тип события: {event_type}")
# Использование
consumer = OrderConsumer('localhost:9092', 'order-processing-group')
consumer.start()
Потоковая обработка с Faust
import faust
from datetime import datetime
app = faust.App(
'order-processing',
broker='kafka://localhost:9092',
store='memory://',
version=1
)
class OrderEvent(faust.Record, serializer='json'):
type: str
order_id: str
customer_id: str
items: list
timestamp: str
orders_topic = app.topic('orders', value_type=OrderEvent)
# Таблица для агрегации по клиентам
customer_orders = app.Table(
'customer_orders',
default=int
)
@app.agent(orders_topic)
async def process_orders(stream):
async for event in stream:
if event.type == 'OrderCreated':
print(f"Обработка заказа {event.order_id}")
# Инкремент счётчика заказов клиента
customer_orders[event.customer_id] += 1
# Логика обработки...
await process_order(event)
async def process_order(event):
# Бизнес-логика
pass
# Периодическая задача для отчётов
@app.timer(interval=60.0)
async def report_stats():
total = sum(customer_orders.values())
print(f"Статистика: всего заказов = {total}")
for customer, count in customer_orders.items():
print(f" Клиент {customer}: {count} заказов")
if __name__ == '__main__':
app.main()
Java с Spring Kafka
Конфигурация
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfig {
private final String bootstrapServers = "localhost:9092";
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(
org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE
);
return factory;
}
}
Производитель
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@Service
public class OrderEventProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private final String topic = "orders";
public OrderEventProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = new ObjectMapper();
}
public CompletableFuture<SendResult<String, String>> produceOrderCreated(
UUID orderId, String customerId, OrderItem[] items) {
try {
OrderEvent event = new OrderEvent(
"OrderCreated",
orderId.toString(),
customerId,
items,
System.currentTimeMillis()
);
String value = objectMapper.writeValueAsString(event);
return kafkaTemplate.send(topic, orderId.toString(), value)
.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Событие отправлено: " +
result.getRecordMetadata().offset());
} else {
System.err.println("Ошибка отправки: " + ex.getMessage());
}
});
} catch (Exception e) {
throw new RuntimeException("Ошибка сериализации", e);
}
}
private static class OrderEvent {
public String type;
public String orderId;
public String customerId;
public OrderItem[] items;
public long timestamp;
public OrderEvent(String type, String orderId, String customerId,
OrderItem[] items, long timestamp) {
this.type = type;
this.orderId = orderId;
this.customerId = customerId;
this.items = items;
this.timestamp = timestamp;
}
}
private static class OrderItem {
public String productId;
public int quantity;
public double price;
}
}
Потребитель
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
public class OrderEventConsumer {
private final ObjectMapper objectMapper = new ObjectMapper();
@KafkaListener(topics = "orders", groupId = "order-processing-group")
public void consume(@Payload String eventData, Acknowledgment ack) {
try {
OrderEvent event = objectMapper.readValue(eventData, OrderEvent.class);
System.out.println("Получено событие: " + event.type +
" для заказа " + event.orderId);
processEvent(event);
// Подтверждение обработки
ack.acknowledge();
} catch (Exception e) {
System.err.println("Ошибка обработки события: " + e.getMessage());
// Можно отправить в DLQ
}
}
private void processEvent(OrderEvent event) {
switch (event.type) {
case "OrderCreated":
handleOrderCreated(event);
break;
case "OrderCompleted":
handleOrderCompleted(event);
break;
default:
System.out.println("Неизвестный тип события: " + event.type);
}
}
private void handleOrderCreated(OrderEvent event) {
System.out.println("Обработка созданного заказа: " + event.orderId);
// Бизнес-логика...
}
private void handleOrderCompleted(OrderEvent event) {
System.out.println("Обработка завершённого заказа: " + event.orderId);
// Бизнес-логика...
}
private static class OrderEvent {
public String type;
public String orderId;
public String customerId;
public OrderItem[] items;
public long timestamp;
}
private static class OrderItem {
public String productId;
public int quantity;
public double price;
}
}
Потоковая обработка с Kafka Streams
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Properties;
@Component
public class OrderAnalyticsStreams {
@Bean
public KafkaStreams orderAnalyticsStreams() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Чтение событий заказов
KStream<String, String> orders = builder.stream("orders");
// Фильтрация только созданных заказов
KStream<String, OrderEvent> createdOrders = orders
.filter((key, value) -> value.contains("\"type\":\"OrderCreated\""))
.mapValues(value -> parseOrderEvent(value));
// Агрегация по клиентам
KTable<String, Long> customerOrderCount = createdOrders
.groupBy((key, event) -> event.customerId)
.count(Materialized.as("customer-order-count"));
// Подсчёт среднего чека по часам
KTable<Windowed<String>, Double> hourlyAvgOrderValue = createdOrders
.filter((key, event) -> event.totalAmount > 0)
.groupBy((key, event) -> "all")
.windowedBy(TimeWindows.of(Duration.ofHours(1)))
.aggregate(
() -> 0.0,
(key, event, total) -> total + event.totalAmount,
(key, event, total) -> total - event.totalAmount,
Materialized.as("hourly-total-amount")
)
.mapValues(total -> total / 100.0); // Пример вычисления
// Вывод результатов в топики
customerOrderCount
.toStream()
.to("customer-order-count", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
return streams;
}
private OrderEvent parseOrderEvent(String json) {
// Парсинг JSON в объект OrderEvent
return new OrderEvent();
}
private static class OrderEvent {
String customerId;
double totalAmount;
}
}
Node.js с kafkajs
Производитель
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['localhost:9092']
});
const producer = kafka.producer({
acks: -1, // all replicas
idempotent: true
});
async function produceOrderCreated(orderId, customerId, items) {
await producer.connect();
const event = {
type: 'OrderCreated',
orderId: orderId,
customerId: customerId,
items: items,
timestamp: new Date().toISOString()
};
await producer.send({
topic: 'orders',
messages: [{
key: orderId,
value: JSON.stringify(event)
}]
});
console.log(`Событие отправлено: ${orderId}`);
await producer.disconnect();
}
// Использование
produceOrderCreated(
'order-123',
'customer-456',
[{ productId: 'p1', quantity: 2, price: 100 }]
).catch(console.error);
Потребитель
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-processor',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({
groupId: 'order-processing-group',
sessionTimeout: 30000
});
async function startConsumer() {
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const event = JSON.parse(message.value.toString());
console.log(`Получено событие: ${event.type}`);
await processEvent(event);
// Смещение автоматически подтверждается
} catch (error) {
console.error('Ошибка обработки:', error);
// Можно отправить в DLQ
}
}
});
}
async function processEvent(event) {
switch (event.type) {
case 'OrderCreated':
console.log(`Создан заказ: ${event.orderId}`);
// Логика обработки...
break;
case 'OrderCompleted':
console.log(`Завершён заказ: ${event.orderId}`);
break;
default:
console.log(`Неизвестный тип: ${event.type}`);
}
}
// Запуск
startConsumer().catch(console.error);
// Graceful shutdown
process.on('SIGTERM', async () => {
await consumer.disconnect();
process.exit(0);
});
Потоковая обработка с ksqlDB
// HTTP-запросы к ksqlDB REST API
const fetch = require('node-fetch');
async function createStream() {
const query = `
CREATE STREAM orders_stream (
type VARCHAR,
orderId VARCHAR,
customerId VARCHAR,
items ARRAY<STRUCT<productId VARCHAR, quantity INT, price DOUBLE>>,
timestamp VARCHAR
) WITH (
kafka_topic='orders',
value_format='json',
partitions=4
);
`;
await fetch('http://localhost:8088/ksql', {
method: 'POST',
headers: { 'Content-Type': 'application/vnd.ksql.v1+json' },
body: JSON.stringify({ ksql: query })
});
}
async function createAggregation() {
const query = `
CREATE TABLE customer_order_count AS
SELECT customerId, COUNT(*) AS orderCount
FROM orders_stream
GROUP BY customerId
EMIT CHANGES;
`;
await fetch('http://localhost:8088/ksql', {
method: 'POST',
headers: { 'Content-Type': 'application/vnd.ksql.v1+json' },
body: JSON.stringify({ ksql: query })
});
}
async function queryOrders() {
const response = await fetch('http://localhost:8088/query', {
method: 'POST',
headers: { 'Content-Type': 'application/vnd.ksql.v1+json' },
body: JSON.stringify({
ksql: "SELECT * FROM customer_order_count WHERE customerId = 'customer-456';"
})
});
const data = await response.json();
console.log('Результаты:', data);
}